package ucloud

import (
	"errors"
	"strings"

	"gitee.com/monitor_dev/ucloud_kafka_exporter/ucloud/ukafka"
)

type UkafkaCluster struct {
	Zone                *zoneInfo
	ClusterInstanceId   string
	ClusterInstanceName string
	FrameworkVersion    string
	UHostCount          int
	BrokerIP            []string
	ZookeeperIP         []string
}

func (uauth *authInfo) ukafkaClient() *ukafka.UKafkaClient {
	return ukafka.NewClient(uauth.cfg, uauth.cre)
}

func ukafkaResourceDesc(uclient *ukafka.UKafkaClient, zone zoneInfo, ch chan<- *UkafkaCluster) error {
	offset := 0
	limit := 50

	uKafkaReq := uclient.NewListUKafkaInstance()
	uKafkaReq.ProjectId = &zone.ProjectId
	uKafkaReq.Region = &zone.Region
	uKafkaReq.Zone = &zone.Zone
	uKafkaReq.Offset = &offset
	uKafkaReq.Limit = &limit

	uKafkaList, err := uclient.ListUKafka(uKafkaReq)
	if err != nil {
		return err
	}
	if uKafkaList.TotalCount == 0 {
		e := strings.Join([]string{
			"Project :",
			zone.ProjectName,
			", Zone :",
			zone.Zone,
			"no authorized kafka information ！",
		}, " ")
		return errors.New(e)
	}

	for i := 0; i < uKafkaList.TotalCount; i = i + limit {
		offset = i
		if offset > 0 {
			uKafkaList, _ = uclient.ListUKafka(uKafkaReq)
		}
		for _, cluster := range uKafkaList.ClusterSet {
			kafka := new(UkafkaCluster)
			kafka.Zone = &zone
			kafka.ClusterInstanceId = cluster.ClusterInstanceId
			kafka.ClusterInstanceName = cluster.ClusterInstanceName
			kafka.FrameworkVersion = cluster.FrameworkVersion
			kafka.UHostCount = cluster.UHostCount
			kafka.BrokerIP = make([]string, 0, cluster.UHostCount)
			uKafkaHostReq := uclient.NewDescribeUKafkaInstance()
			uKafkaHostReq.ProjectId = &zone.ProjectId
			uKafkaHostReq.Region = &zone.Region
			uKafkaHostReq.Zone = &zone.Zone
			uKafkaHostReq.ClusterInstanceId = &cluster.ClusterInstanceId
			uKafkaCluster, _ := uclient.DescribeUKafkaInstance(uKafkaHostReq)
			for _, host := range uKafkaCluster.ClusterSet[0].UHostSet {
				ip := strings.Join(
					[]string{
						host.IPSet[0].IP,
						"9092",
					}, ":",
				)
				zookeeperip := strings.Join(
					[]string{
						host.IPSet[0].IP,
						"2181",
					}, ":",
				)
				kafka.BrokerIP = append(kafka.BrokerIP, ip)
				kafka.ZookeeperIP = append(kafka.ZookeeperIP, zookeeperip)
			}

			ch <- kafka
		}
	}
	return nil
}
